package x.trident.core.micro.tran.message.modular.provider;

import cn.hutool.core.bean.BeanUtil;
import cn.hutool.core.util.ObjectUtil;
import x.trident.core.db.api.factory.PageResultFactory;
import x.trident.core.db.api.pojo.page.PageResult;
import x.trident.core.micro.api.TranMessageServiceApi;
import x.trident.core.micro.api.enums.TranMessageStatusEnum;
import x.trident.core.micro.api.exception.MicroException;
import x.trident.core.micro.api.exception.enums.MicroExceptionEnum;
import x.trident.core.micro.api.exception.enums.TranMessageExceptionEnum;
import x.trident.core.micro.api.pojo.TranMessage;
import x.trident.core.micro.api.pojo.request.TranMessageRequest;
import x.trident.core.micro.tran.message.config.properteis.MessageProperties;
import x.trident.core.micro.tran.message.core.msg.MessageSender;
import x.trident.core.micro.tran.message.modular.service.ITranMessageService;
import x.trident.core.enums.YesOrNotEnum;
import x.trident.core.exception.base.ServiceException;
import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper;
import com.baomidou.mybatisplus.core.metadata.IPage;
import com.baomidou.mybatisplus.extension.plugins.pagination.Page;
import lombok.extern.slf4j.Slf4j;
import org.springframework.transaction.annotation.Transactional;
import org.springframework.web.bind.annotation.RequestBody;
import org.springframework.web.bind.annotation.RequestParam;
import org.springframework.web.bind.annotation.RestController;

import javax.annotation.Resource;
import java.text.SimpleDateFormat;
import java.util.*;


/**
 * 消息服务提供接口的实现
 *
 * @author Seven
 * @date 2018-04-16 22:30
 */
@RestController
@Slf4j
public class MessageServiceImpl implements TranMessageServiceApi {

    @Resource
    private ITranMessageService reliableMessageService;

    @Resource
    private MessageSender messageSender;

    @Resource
    private MessageProperties messageProperties;

    @Override
    public TranMessage preSaveMessage(@RequestBody TranMessageRequest reliableMessage) {

        // 检查消息数据的完整性
        this.checkEmptyMessage(reliableMessage);

        // 设置状态为待确认
        reliableMessage.setStatus(TranMessageStatusEnum.WAIT_VERIFY.name());

        // 标记未死亡
        reliableMessage.setAlreadyDead(YesOrNotEnum.N.name());
        reliableMessage.setMessageSendTimes(0);
        reliableMessage.setUpdateTime(new Date());

        // 实体转化
        TranMessage tranMessage = new TranMessage();
        BeanUtil.copyProperties(reliableMessage, tranMessage);

        // 存储消息
        reliableMessageService.save(tranMessage);

        return tranMessage;
    }

    @Override
    public void confirmAndSendMessage(@RequestParam("messageId") String messageId) {
        LambdaQueryWrapper<TranMessage> wrapper = new LambdaQueryWrapper<>();
        wrapper.eq(TranMessage::getMessageId, messageId);
        TranMessage reliableMessage = this.reliableMessageService.getOne(wrapper);

        if (reliableMessage == null) {
            throw new ServiceException(TranMessageExceptionEnum.CANT_FIND_MESSAGE);
        }

        reliableMessage.setStatus(TranMessageStatusEnum.SENDING.name());
        reliableMessage.setUpdateTime(new Date());
        reliableMessageService.updateById(reliableMessage);

        //发送消息
        messageSender.sendMessage(reliableMessage);
    }

    @Override
    public void saveAndSendMessage(@RequestBody TranMessageRequest reliableMessage) {

        // 检查消息数据的完整性
        this.checkEmptyMessage(reliableMessage);

        reliableMessage.setStatus(TranMessageStatusEnum.SENDING.name());
        reliableMessage.setAlreadyDead(YesOrNotEnum.N.name());
        reliableMessage.setMessageSendTimes(0);
        reliableMessage.setUpdateTime(new Date());

        // 实体转化
        TranMessage tranMessage = new TranMessage();
        BeanUtil.copyProperties(reliableMessage, tranMessage);

        reliableMessageService.save(tranMessage);

        // 发送消息
        messageSender.sendMessage(tranMessage);
    }

    @Override
    public void directSendMessage(@RequestBody TranMessageRequest reliableMessage) {

        //检查消息数据的完整性
        this.checkEmptyMessage(reliableMessage);

        // 实体转化
        TranMessage tranMessage = new TranMessage();
        BeanUtil.copyProperties(reliableMessage, tranMessage);

        //发送消息
        messageSender.sendMessage(tranMessage);
    }

    @Override
    public void reSendMessage(@RequestBody TranMessageRequest reliableMessage) {

        //检查消息数据的完整性
        this.checkEmptyMessage(reliableMessage);

        //更新消息发送次数
        reliableMessage.setMessageSendTimes(reliableMessage.getMessageSendTimes() + 1);
        reliableMessage.setUpdateTime(new Date());

        // 实体转化
        TranMessage tranMessage = new TranMessage();
        BeanUtil.copyProperties(reliableMessage, tranMessage);

        reliableMessageService.updateById(tranMessage);

        //发送消息
        messageSender.sendMessage(tranMessage);
    }

    @Override
    public void reSendMessageByMessageId(@RequestParam("messageId") String messageId) {

        if (ObjectUtil.isEmpty(messageId)) {
            throw new MicroException(MicroExceptionEnum.REQUEST_EMPTY, "messageId");
        }

        TranMessage reliableMessage = getMessageByMessageId(messageId);
        reliableMessage.setMessageSendTimes(reliableMessage.getMessageSendTimes() + 1);
        reliableMessage.setUpdateTime(new Date());
        reliableMessageService.updateById(reliableMessage);

        //发送消息
        messageSender.sendMessage(reliableMessage);
    }

    @Override
    public void setMessageToAlreadlyDead(@RequestParam("messageId") String messageId) {

        if (ObjectUtil.isEmpty(messageId)) {
            throw new MicroException(MicroExceptionEnum.REQUEST_EMPTY, "messageId");
        }

        TranMessage reliableMessage = this.getMessageByMessageId(messageId);
        reliableMessage.setAlreadyDead(YesOrNotEnum.Y.name());
        reliableMessage.setUpdateTime(new Date());

        this.reliableMessageService.updateById(reliableMessage);

        //发送消息
        messageSender.sendMessage(reliableMessage);
    }

    @Override
    public TranMessage getMessageByMessageId(@RequestParam("messageId") String messageId) {

        if (ObjectUtil.isEmpty(messageId)) {
            throw new MicroException(MicroExceptionEnum.REQUEST_EMPTY, "messageId");
        }

        LambdaQueryWrapper<TranMessage> wrapper = new LambdaQueryWrapper<>();
        wrapper.eq(TranMessage::getMessageId, messageId);
        List<TranMessage> reliableMessages = this.reliableMessageService.list(wrapper);
        if (reliableMessages == null || reliableMessages.size() == 0) {
            throw new ServiceException(TranMessageExceptionEnum.CANT_FIND_MESSAGE);
        } else {
            if (reliableMessages.size() > 1) {
                log.error("消息记录出现错误数据！消息id：" + messageId);
                throw new ServiceException(TranMessageExceptionEnum.MESSAGE_NUMBER_WRONG);
            } else {
                return reliableMessages.get(0);
            }
        }
    }

    @Override
    public void deleteMessageByMessageId(@RequestParam("messageId") String messageId) {

        if (ObjectUtil.isEmpty(messageId)) {
            throw new MicroException(MicroExceptionEnum.REQUEST_EMPTY, "messageId");
        }

        LambdaQueryWrapper<TranMessage> wrapper = new LambdaQueryWrapper<>();
        wrapper.eq(TranMessage::getMessageId, messageId);
        this.reliableMessageService.remove(wrapper);
    }

    @Override
    public void deleteMessageByBizId(@RequestParam("bizId") Long bizId) {
        if (ObjectUtil.isEmpty(bizId)) {
            throw new MicroException(MicroExceptionEnum.REQUEST_EMPTY, "bizId");
        }

        LambdaQueryWrapper<TranMessage> wrapper = new LambdaQueryWrapper<>();
        wrapper.eq(TranMessage::getBizUniqueId, bizId);
        this.reliableMessageService.remove(wrapper);
    }

    @Override
    @Transactional(rollbackFor = Exception.class)
    public void reSendAllDeadMessageByQueueName(@RequestParam("queueName") String queueName) {

        //默认分页大小为1000
        int pageSize = 1000;
        int pageNo = 1;

        //存放查询到的所有死亡消息
        Map<String, TranMessage> resultMap = new HashMap<>();

        //循环查询所有结构（分页）
        Page<TranMessage> page = new Page<>(pageNo, pageSize);

        IPage<TranMessage> pageResult = this.reliableMessageService.page(page);

        if (pageResult == null) {
            return;
        }

        List<TranMessage> records = pageResult.getRecords();
        if (records == null || records.isEmpty()) {
            return;
        }

        //把结果放入集合
        for (TranMessage record : records) {
            resultMap.put(record.getMessageId(), record);
        }

        //循环查出剩下的还有多少,并且都放入集合
        long pages = pageResult.getPages();
        for (pageNo = 2; pageNo <= pages; pageNo++) {

            Page<TranMessage> pageTemp = new Page<>(pageNo, pageSize);

            IPage<TranMessage> secondPageResult = this.reliableMessageService.page(pageTemp);
            if (secondPageResult == null) {
                break;
            }

            List<TranMessage> secondRecords = secondPageResult.getRecords();
            if (secondRecords == null || secondRecords.isEmpty()) {
                break;
            }

            for (TranMessage record : records) {
                resultMap.put(record.getMessageId(), record);
            }
        }

        //重新发送死亡消息
        for (TranMessage reliableMessage : resultMap.values()) {
            reliableMessage.setUpdateTime(new Date());
            reliableMessage.setMessageSendTimes(reliableMessage.getMessageSendTimes() + 1);
            this.reliableMessageService.updateById(reliableMessage);

            this.messageSender.sendMessage(reliableMessage);
        }
    }

    @Override
    public PageResult<TranMessage> listPageWaitConfirmTimeOutMessages(@RequestBody TranMessageRequest pageParam) {
        Page<TranMessage> page = new Page<>(pageParam.getPageNo(), pageParam.getPageSize());

        LambdaQueryWrapper<TranMessage> wrapper = new LambdaQueryWrapper<>();
        wrapper.lt(TranMessage::getCreateTime, getCreateTimeBefore(messageProperties.getCheckIntervalSeconds()))
                .and(i -> i.eq(TranMessage::getStatus, TranMessageStatusEnum.WAIT_VERIFY.name()));
        Page<TranMessage> reliableMessagePage = this.reliableMessageService.page(page, wrapper);
        return PageResultFactory.createPageResult(reliableMessagePage);
    }

    @Override
    public PageResult<TranMessage> listPageSendingTimeOutMessages(@RequestBody TranMessageRequest pageParam) {
        Page<TranMessage> page = new Page<>(pageParam.getPageNo(), pageParam.getPageSize());

        LambdaQueryWrapper<TranMessage> wrapper = new LambdaQueryWrapper<>();
        wrapper.lt(TranMessage::getCreateTime, getCreateTimeBefore(messageProperties.getCheckIntervalSeconds()))
                .and(i -> i.eq(TranMessage::getStatus, TranMessageStatusEnum.SENDING.name()))
                .and(i -> i.eq(TranMessage::getAlreadyDead, YesOrNotEnum.N.name()));
        Page<TranMessage> reliableMessagePage = this.reliableMessageService.page(page, wrapper);
        return PageResultFactory.createPageResult(reliableMessagePage);
    }

    /**
     * 检查消息参数是否为空
     *
     * @author Seven
     * @date 2018/4/21 23:14
     */
    private void checkEmptyMessage(TranMessageRequest tranMessageRequest) {
        if (tranMessageRequest == null) {
            throw new MicroException(TranMessageExceptionEnum.REQUEST_EMPTY);
        }
        if (ObjectUtil.isEmpty(tranMessageRequest.getMessageId())) {
            throw new MicroException(TranMessageExceptionEnum.MESSAGE_ID_CANT_EMPTY);
        }
        if (ObjectUtil.isEmpty(tranMessageRequest.getMessageBody())) {
            throw new MicroException(TranMessageExceptionEnum.MESSAGE_BODY_CANT_EMPTY);
        }
        if (ObjectUtil.isEmpty(tranMessageRequest.getTopic())) {
            throw new MicroException(TranMessageExceptionEnum.QUEUE_CANT_EMPTY);
        }
    }

    /**
     * 获取某个时间间隔以前的时间 时间格式：yyyy-MM-dd HH:mm:ss
     *
     * @author Seven
     * @date 2018/5/8 22:05
     */
    public String getCreateTimeBefore(int seconds) {
        long currentTimeInMillis = Calendar.getInstance().getTimeInMillis();
        Date date = new Date(currentTimeInMillis - seconds * 1000L);
        SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
        return sdf.format(date);
    }

}
